from pyflink.table import EnvironmentSettings, TableEnvironment
import os

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
CREATE TABLE ods_user (
  user_id STRING,
  name STRING,
  gender STRING,
  age INT,
  email STRING,
  address STRING
) WITH (
  'connector'='kafka',
  'topic'='ods_user',
  'properties.bootstrap.servers'='kafka:9092',
  'format'='json',
  'scan.startup.mode'='earliest-offset'
)
""")

uri = 'jdbc:clickhouse://clickhouse:8123/default'
table_env.execute_sql(f"""
CREATE TABLE dwd_user (
  user_id STRING,
  name STRING,
  gender STRING,
  age INT,
  email STRING,
  address STRING
) WITH (
  'connector'='jdbc',
  'url'='{uri}',
  'table-name'='dwd_user',
  'driver'='ru.yandex.clickhouse.ClickHouseDriver'
)
""")

table_env.execute_sql("""
INSERT INTO dwd_user
SELECT user_id, name, gender, age, email, address
FROM ods_user WHERE age BETWEEN 18 AND 60
""")
